package com.test.rocketmq.controller;

import com.test.rocketmq.constant.TxProducerGroupConstant;
import com.test.rocketmq.domain.OrderPaidEvent;
import com.test.rocketmq.constant.MessageDelayLevel;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.*;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

/**
 * @Author: huangkunming
 * @Date: 2021/05/03 11:33
 * @Description: rocketMQ测试接口
 */
@RestController
@RequestMapping("/rocketMq")
@Slf4j
public class MQMessageController {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    /**
     * 同步发送消息
     */
    @RequestMapping("/sync")
    public void syncSend() {
        String objectTopic = "object-topic";
        OrderPaidEvent orderPaidEvent = new OrderPaidEvent();
        orderPaidEvent.setOrderNo("123456789");
        orderPaidEvent.setPaidMoney(new BigDecimal(20));
        SendResult sendResultObj = rocketMQTemplate.syncSend(objectTopic, orderPaidEvent);
        log.info("MQ同步发送对象类型的消息topic为:{},返回结果:{}", objectTopic, sendResultObj);
    }

    /**
     * 异步发送消息
     */
    @RequestMapping("/async")
    public void asyncSend() {
        String objectTopic = "object-topic";
        OrderPaidEvent orderPaidEvent = new OrderPaidEvent();
        orderPaidEvent.setOrderNo("123456789");
        orderPaidEvent.setPaidMoney(new BigDecimal(20));
        rocketMQTemplate.asyncSend(objectTopic, orderPaidEvent, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("异步消息发送成功:{}", sendResult);
            }

            @Override
            public void onException(Throwable throwable) {
                log.error("异步消息发送失败:{}", throwable.getCause());
            }
        });
    }

    /**
     * 单向发送消息
     */
    @RequestMapping("/oneWay")
    public void oneWaySend() {
        String objectTopic = "object-topic";
        OrderPaidEvent orderPaidEvent = new OrderPaidEvent();
        orderPaidEvent.setOrderNo("123456789");
        orderPaidEvent.setPaidMoney(new BigDecimal(20));
        rocketMQTemplate.sendOneWay(objectTopic, orderPaidEvent);
    }

    /**
     * 延迟发送消息
     */
    @RequestMapping("/delay")
    public void delaySend() {
        String objectTopic = "object-topic";
        OrderPaidEvent orderPaidEvent = new OrderPaidEvent();
        orderPaidEvent.setOrderNo("123456789");
        orderPaidEvent.setPaidMoney(new BigDecimal(20));
        Message<OrderPaidEvent> message = MessageBuilder.withPayload(orderPaidEvent).build();
        // 这里的TIME_5S枚举意思是延迟5s发送消息
        SendResult sendResult = rocketMQTemplate.syncSend(objectTopic, message, 1000, MessageDelayLevel.TIME_5S);
        log.info("发送延迟消息返回结果:{}", sendResult);
    }

    /**
     * 顺序发送消息
     */
    @RequestMapping("/orderly")
    public void orderlySend() {
        String orderlyTopic = "orderly-topic";
        for (int i = 0; i < 5; i++) {
            OrderPaidEvent orderPaidEvent = new OrderPaidEvent();
            orderPaidEvent.setOrderNo("123456789" + i);
            orderPaidEvent.setPaidMoney(new BigDecimal(20));
            // syncSendOrderly(String destination, Object payload, String hashKey)
            SendResult sendResult = rocketMQTemplate.syncSendOrderly(orderlyTopic, orderPaidEvent, "秒杀订单顺序队列");
            log.info("同步发送顺序消息返回结果:{}", sendResult);
        }
    }

    /**
     * 过滤发送消息
     */
    @RequestMapping("/filter")
    public void filterSend() {
        String objectTopic = "object-topic";
        OrderPaidEvent orderPaidEvent = new OrderPaidEvent();
        orderPaidEvent.setOrderNo("123456");
        String tag = ":china-order";
        rocketMQTemplate.syncSend(objectTopic + tag, orderPaidEvent);
    }

    /**
     * 批量发送消息
     */
    @RequestMapping("/batch")
    public void batchSend() {
        String batchTopic = "batch-topic";
        List<Message> messages = new ArrayList<>();
        // 消息
        OrderPaidEvent orderPaidEvent_A = new OrderPaidEvent();
        orderPaidEvent_A.setOrderNo("A-123456");
        // 构建 Spring Messaging 定义的 Message 消息
        messages.add(MessageBuilder.withPayload(orderPaidEvent_A).build());

        OrderPaidEvent orderPaidEvent_B = new OrderPaidEvent();
        orderPaidEvent_A.setOrderNo("B-123456");
        messages.add(MessageBuilder.withPayload(orderPaidEvent_B).build());

        OrderPaidEvent orderPaidEvent_C = new OrderPaidEvent();
        orderPaidEvent_A.setOrderNo("C-123456");
        messages.add(MessageBuilder.withPayload(orderPaidEvent_C).build());

        // 同步批量发送消息
        rocketMQTemplate.syncSend(batchTopic, messages, 30 * 1000L);
    }

    /**
     * 事务发送消息
     */
    @RequestMapping("/transaction")
    public void transactionSend() {
        // 生产者组
        String txProducerGroup = TxProducerGroupConstant.ORDER_TX_PRODUCER_GROUP;
        // topic
        String topic = "tx-topic";

        OrderPaidEvent orderPaidEvent = new OrderPaidEvent();
        orderPaidEvent.setOrderNo("123456789");
        orderPaidEvent.setPaidMoney(new BigDecimal(20));

        String transactionId = UUID.randomUUID().toString();
        Message<OrderPaidEvent> message = MessageBuilder
                .withPayload(orderPaidEvent)
                .setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
                .setHeader(RocketMQHeaders.KEYS, orderPaidEvent.getOrderNo())
                .build();

        TransactionSendResult sendResult = rocketMQTemplate.sendMessageInTransaction(txProducerGroup, topic, message, null);
        if (sendResult.getSendStatus().equals(SendStatus.SEND_OK)
                && sendResult.getLocalTransactionState().equals(LocalTransactionState.COMMIT_MESSAGE)) {
            log.info("消息发送成功，并且本地事务执行成功");
        }
    }

}
